﻿using gt.rediscache.core.Connections.Route;
using gt.rediscache.core.Entry;
using gt.rediscache.core.Utility;
using gt.rediscache.logger;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace gt.rediscache.core.Connections.ClientServer
{
    /// <summary>
    /// RedisClientServer连接池
    /// </summary>
    public class RedisClientServerPool : IDisposable
    {
        private ConcurrentDictionary<string, RedisClientServer> _clientServerPool;
        private ConcurrentDictionary<string, RedisClientServer> _unHealthClientServerPool;
        private IRedisNodeRouter _router;
        private int _delayedRecoverySeconds; //延迟恢复时间
        private int _failoverWaitSeconds; //主从切换等待时间

        public event EventHandler<ClientServerEventArgs> OnClientServerChanged;

        public RedisClientServerPool(CachePoolConfiguration options,
            RedisNodeRouterFactory routerFactory)
        {
            if (options == null) throw new ArgumentNullException(nameof(CachePoolConfiguration));
            if (string.IsNullOrEmpty(options.Name)) throw new ArgumentNullException(nameof(CachePoolConfiguration.Name));
            if (routerFactory == null) throw new ArgumentNullException(nameof(RedisNodeRouterFactory));

            _clientServerPool = new ConcurrentDictionary<string, RedisClientServer>();
            _unHealthClientServerPool = new ConcurrentDictionary<string, RedisClientServer>();
            _router = routerFactory.CreateRouter(options) ?? throw new ArgumentNullException(nameof(RedisNodeRouterFactory));

            Name = options.Name;
            Mode = options.Mode;
            _delayedRecoverySeconds = options.DelayedRecoverySeconds ?? -1;
            _failoverWaitSeconds = options.RedisFailoverWaitSeconds ?? 5;

            Parallel.ForEach(options.Nodes, redisNode =>
            {
                var cs = new RedisClientServer(redisNode);
                var key = BuildKey(cs.Name);

                if (_clientServerPool.ContainsKey(key))
                    throw new InvalidOperationException(string.Format("redisclientserver pool:{0} already exist clientserver:{1}.", Name, cs.Name));
                if (!_clientServerPool.TryAdd(key, cs))
                    throw new InvalidOperationException(string.Format("redisclientserver pool:{0} add clientserver:{1} failed.", Name, cs.Name));

                cs.OnConnectionFailedEvent += Cs_OnConnectionFailedEvent;
                cs.OnConnectionRestoredEvent += Cs_OnConnectionRestoredEvent;

                if (!cs.Connection.IsConnected)
                {
                    FailedClientServer(cs);
                }
            });
        }

        #region private

        /// <summary>
        /// clientserver connection restored
        /// </summary>
        private void Cs_OnConnectionRestoredEvent(object sender, ClientServerEventArgs e)
        {
            var clientServer = sender as RedisClientServer;
            EnsureRestoredClientServerAsync(clientServer)
                .ContinueWith(t => OnClientServerChanged?.Invoke(clientServer, e))
                .ConfigureAwait(false);
        }
        /// <summary>
        /// clientserver connection failed
        /// </summary>
        private void Cs_OnConnectionFailedEvent(object sender, ClientServerEventArgs e)
        {
            var clientServer = sender as RedisClientServer;
            EnsureFailedClientServerAsync(clientServer)
                .ContinueWith(t => OnClientServerChanged?.Invoke(clientServer, e))
                .ConfigureAwait(false);
        }

        /// <summary>
        /// RedisClientServer connect failed
        /// </summary>
        /// <param name="clientServer"></param>
        private Task EnsureFailedClientServerAsync(RedisClientServer clientServer)
        {
            return Task.Run(() =>
            {
                try
                {
                    var downTime = DateTime.Now;
                    var connected = false;

                    while ((DateTime.Now - downTime).TotalSeconds <= _failoverWaitSeconds)
                    {
                        var newMaster = clientServer.Connection.GetEndPoints().FirstOrDefault(x =>
                        {
                            var server = clientServer.Connection.GetServer(x);
#if NET45
                        return !server.IsSlave&&server.IsConnected;
#else
                            return !server.IsReplica && server.IsConnected;
#endif
                        });
                        if (newMaster != null)
                        {
                            connected = true;
                            RedisLogManager.Info($"redisClientServer:{clientServer.Node} reconnect success,new master is {RedisClientUtility.GetHostWithPort(newMaster)}");
                            break;
                        }

                        Task.Delay(10).Wait();
                    }
                    if (!connected)
                    {
                        if (_unHealthClientServerPool.TryAdd(BuildKey(clientServer.Name), clientServer))
                            RedisLogManager.Info($"redisClientServer:{clientServer.Node} reconnect failed,add to unhealthy pool.");
                    }
                }
                catch (Exception ex)
                {
                    RedisLogManager.Error("EnsureFailedClientServer error.", ex);
                }
            });
        }
        /// <summary>
        /// 标记RedisClientServer connect restored
        /// 从failed容器中移除
        /// </summary>
        /// <param name="clientServer"></param>
        private Task EnsureRestoredClientServerAsync(RedisClientServer clientServer)
        {
            return Task.Run(() =>
            {
                try
                {
                    if (_delayedRecoverySeconds > 0)
                    {
                        Task.Delay(_delayedRecoverySeconds * 1000).Wait();
                        if (_unHealthClientServerPool.TryRemove(BuildKey(clientServer.Name), out RedisClientServer healthClientServer))
                            RedisLogManager.Info($"redisClientServer:{clientServer.Node} restored success,remove from unhealthy pool.");
                    }
                }
                catch (Exception ex)
                {
                    RedisLogManager.Error("RestoredClientServer error.", ex);
                }
            });
        }

        #endregion

        /// <summary>
        /// 连接池名称
        /// </summary>
        public string Name { get; private set; }
        /// <summary>
        /// 连接Mode
        /// </summary>
        public PoolMode Mode { get; private set; }

        /// <summary>
        /// 获取ClientServer
        /// </summary>
        /// <param name="clientServerName"></param>
        /// <returns></returns>
        public RedisClientServer GetClientServerByNodeName(string clientServerName)
        {
            RedisClientServer cs = null;
            var key = BuildKey(clientServerName);
            _clientServerPool.TryGetValue(key, out cs);
            return cs;
        }
        /// <summary>
        /// 获取ClientServer
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public RedisClientServer GetClientServer(string key)
        {
            var clientServerName = _router.GetClientServerName(key);
            RedisClientServer cs = null;
            _clientServerPool.TryGetValue(BuildKey(clientServerName), out cs);
            return cs;
        }
        /// <summary>
        /// 获取ClientServer
        /// </summary>
        /// <returns></returns>
        public ICollection<RedisClientServer> GetAllRedisClientServer()
        {
            return _clientServerPool.Values;
        }

        /// <summary>
        /// 构建容器Key
        /// </summary>
        /// <param name="clientServerName"></param>
        /// <returns></returns>
        private string BuildKey(string clientServerName)
        {
            return string.Concat(Name, "_", clientServerName);
        }

        /// <summary>
        /// 检查ClientServer是否健康
        /// </summary>
        /// <param name="clientServerName"></param>
        /// <returns></returns>
        public bool CheckHealth(string clientServerName)
        {
            var key = BuildKey(clientServerName);
            return !_unHealthClientServerPool.ContainsKey(key);
        }
        /// <summary>
        /// 设置ClientServer failed
        /// </summary>
        /// <param name="clientServer"></param>
        public void FailedClientServer(RedisClientServer clientServer)
        {
            var key = BuildKey(clientServer.Name);
            _unHealthClientServerPool.TryAdd(key, clientServer);
        }
        /// <summary>
        /// 恢复ClientServer restroed
        /// </summary>
        public void RestoredClientServer(RedisClientServer clientServer)
        {
            var key = BuildKey(clientServer.Name);
            _unHealthClientServerPool.TryRemove(key, out RedisClientServer healthClientServer);
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                var clietnServers = _clientServerPool.Values;
                if (clietnServers != null)
                {
                    foreach (var cs in clietnServers)
                    {
                        cs.Dispose();
                    }
                }
                _clientServerPool.Clear();
                _unHealthClientServerPool.Clear();
            }
        }
    }
}
